import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
import java.util.Arrays;

public class AggregateFunctions{
            public static void main(String[] args) throws Exception
            {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


                    DataStream<person> ds1 = env.fromCollection(Arrays.asList(
                            new person( "Latte",1,6),
                            new person( "Milk",2,3),
                            new person( "Breve",3,5),
                            new person("Mocha",4,8),
                            new person("Tea",5,4)
                    ));



        tEnv.createTemporaryView("MyTable",ds1);
// call function "inline" without registration in Table API
        Table result1=tEnv .from("MyTable")
                .groupBy($("name"))
                .select($("name"), call(WeightedAvg.class, $("value"), $("weight")));

// register function
        tEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);

//// call registered function in Table API
        Table result2=tEnv.from("MyTable")
                .groupBy($("name"))
                .select($("name"), call("WeightedAvg", $("value"), $("weight")));

// call registered function in SQL
        Table result3=tEnv.sqlQuery("SELECT name, WeightedAvg(`value`, weight) FROM MyTable GROUP BY name");


        tEnv.toRetractStream(result1, Row.class).print();
        tEnv.toRetractStream(result2, Row.class).print();
        tEnv.toRetractStream(result3, Row.class).print();
        env.execute();

    }

}
